package com.example.sparkdemo.streaming

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable

/**
 *
 *
 * @author zhang.t.c
 * @date 2021/8/19
 */
object StreamingTest2 {

  def main(args: Array[String]): Unit = {
    val sc: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingTest")
    val ssc = new StreamingContext(sc, Seconds(4))

    val rddQueue = new mutable.Queue[RDD[Int]]()

    val inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)

    val wordcount: DStream[(Int, Int)] = inputStream.map((_, 1)).reduceByKey(_ + _)

    wordcount.print()

    ssc.start()

    for (i <- 1 to 5) {
      rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
      Thread.sleep(2000)
    }

    ssc.awaitTermination()

  }

}
